package com.app.server;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;

import com.app.config.DCacheConfig;
import com.app.db.DBPool;
import com.app.model.TableInfo;
import com.app.store.AbstractMySqlStore;
import com.app.store.KeyValueStore;
import com.app.store.StoreFactory;

/**
 * 数据更新服务,一张表对应一个更新线程和一个删除线程
 * 
 * 一:更新队列<p>
* 数据放入更新队列分两步<p>
* 1. 采用redis的map数据类型 key为updateQueueTable|数据库名称|数据表名称|数据主键;<p>
* 	value为map形式,map中 放如要更新的数据其中key==>columnName value==>columnValue;若表结构为kv形式则map中key分别为{@link KeyValueStore#COLUMN_KEY},{@link KeyValueStore#COLUMN_VALUE},和各个索引类型的属性的名称<p>
* 2.采用redis的set数据类型 key为updateQueueTable|数据库名称|数据表名称<p>
* 	value为set形式,set的值为第一步中的key(即:updateQueueTable|数据库名称|数据表名称|数据主键)<p>
* <p>
* <p>
* 二:删除队列<p>
* 	删除数据只有一步,采用redis的set数据类型,key为delQueueTable|数据库名称|数据表名称<p>
* 	value为set形式,set的值为待删除的数据主键<p>
* 
* @author lisong_otd
*
*/
public class UpdateServer {

	private static final Logger logger = Logger.getLogger(UpdateServer.class);
	
	private static AbstractMySqlStore store = StoreFactory.newColumnStore();
	
	private static volatile boolean shutdown;
	
	/**
	 * key 为QueueRunnable的queueKey
	 */
	public static final ConcurrentHashMap<String,QueueRunnable> queue = new ConcurrentHashMap<String,QueueRunnable>();
	
	private static ScheduledExecutorService service = Executors.newScheduledThreadPool(DCacheConfig.getInstance().getUpdateThreadNum());
	
	
	private static void start() {
		
		service.schedule(new Runnable() {
			
			@Override
			public void run() {
				if(!shutdown) {
					Map<String,TableInfo> tables = store.cacheTables();
					if(tables.size() == 0) {
						logger.warn("no tables................");
						return;
					}
					startThread(tables);
					service.schedule(this,  QueueRunnable.UPDATE_INTERVAL, TimeUnit.SECONDS);
				}
			}
		}, 0, TimeUnit.SECONDS);
	}
	
	private static void checkThread(String tableName) {
		String key = store.genUpdateQueueKeyLevel1(tableName);
		QueueRunnable run = queue.get(key);
		if(run != null && run.done) {
			queue.remove(key, run);
		}
		key = store.genDelQueueKey(tableName);
		run = queue.get(key);
		if(run != null && run.done) {
			queue.remove(key, run);
		}
	}
	
	private static void startThread(Map<String,TableInfo> tables) {
		for(String tableName:tables.keySet()) {
			checkThread(tableName);
			String key = store.genUpdateQueueKeyLevel1(tableName);
			if(!queue.containsKey(key)) {
				UpdateQueueRunnable uq = new UpdateQueueRunnable(tableName,store,service);
				if(queue.putIfAbsent(key,uq) == null) {
					service.schedule(uq, 0, TimeUnit.SECONDS);
					logger.info(uq.queueKey+" started......");
				}
			}
			
			key = store.genDelQueueKey(tableName);
			
			if(!queue.containsKey(key)) {
				DelQueueRunnable dq = new DelQueueRunnable(tableName,store,service);
				if(queue.putIfAbsent(key,dq) == null) {
					service.schedule(dq, 0, TimeUnit.SECONDS);
					logger.info(dq.queueKey+" started......");
				}
			}
		}
				
		for(QueueRunnable run : queue.values()) {
			if(!tables.containsKey(run.tableName)) {//表删掉了?
				run.setStop(true);//停掉线程
			}
		}
	}
	
	public static void main(String[] args) {
		PropertyConfigurator.configure(System.getProperty("user.dir") + "/log4j.properties");//指定log4j的位置
		try {
			UpdateServer.start();
		} catch (Exception e) {
			e.printStackTrace();
			System.exit(0);
		}
		Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
			@Override
			public void run() {
				shutdown = true;
				for(QueueRunnable run : queue.values()) {
					run.setStop(true);
				}
				service.shutdown();
				DBPool.getInstance().close();
			}
		}));
	}
}


